package com.ld.shieldsb.canalclient.etl;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.druid.pool.DruidDataSource;
import com.google.common.base.Joiner;
import com.ld.shieldsb.canalclient.model.EtlConditions;
import com.ld.shieldsb.canalclient.util.CanalUtil;
import com.ld.shieldsb.common.core.collections.ListUtils;
import com.ld.shieldsb.common.core.model.Result;
import com.ld.shieldsb.common.core.util.ResultUtil;
import com.ld.shieldsb.common.core.util.StringUtils;

import lombok.extern.slf4j.Slf4j;

/**
 * 通用etl服务器，用于不需要区分类型的操作
 * 
 * @ClassName CommonEtlService
 * @author <a href="mailto:donggongai@126.com" target="_blank">吕凯</a>
 * @date 2022年1月13日 下午4:21:19
 *
 */
@Slf4j
public class EtlServiceUtil {
    private static final Map<String, AbstractEtlService> SERVICE_MAP = new ConcurrentHashMap<>(); // key对应的数据源

    public static Map<String, AbstractEtlService> getServiceMap() {
        return SERVICE_MAP;
    }

    /**
     * 导入数据
     */
    public static Result getImportDataNum(String dataSourceKey, String database, String table, EtlConditions etlConditions) {
        String sql = "SELECT * FROM " + database + "." + table;
        Result etlResult = new Result();
        List<String> params = etlConditions.getParams();
        int importType = etlConditions.getType();
        List<String> errMsg = new ArrayList<>();

        try {
            if (StringUtils.isEmpty(dataSourceKey)) {
                return ResultUtil.error("数据源未设置！");
            }
            DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(dataSourceKey);

            if (dataSource == null) {
                return ResultUtil.error("数据源不存在！");
            }

            List<Object> values = new ArrayList<>();
            // 拼接条件
            String etlCondition = etlConditions.getEtlCondition();
            String etlDelta = etlConditions.getEtlDelta();
            sql = dealEtlConditionSql(sql, params, etlCondition, etlDelta, importType, values);

            if (log.isDebugEnabled()) {
                log.debug("etl sql : {}", sql);
            }

            // 获取总数
            long cnt = getCountBySql(sql, dataSource, values);
            etlResult.setData(cnt);

        } catch (Exception e) {
            log.error(e.getMessage(), e);
            errMsg.add(" 数据查询异常 =>" + e.getMessage());
        }
        if (errMsg.isEmpty()) {
            etlResult.setSuccess(true);
        } else {
            etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
        }
        return etlResult;
    }

    /**
     * 处理sql的条件
     * 
     * @Title dealEtlConditionSql
     * @author 吕凯
     * @date 2022年1月13日 下午3:21:42
     * @param sql
     * @param params
     * @param importType
     * @param values
     * @return String
     */
    public static String dealEtlConditionSql(String sql, List<String> params, String etlCondition, String etlDelta, int importType,
            List<Object> values) {
        // 去除多余的where条件

        if ((StringUtils.isNotEmpty(etlDelta) || StringUtils.isNotEmpty(etlCondition)) && ListUtils.isNotEmpty(params)) {
            String etlConditionSql = " WHERE ";
            if (importType == 0 && StringUtils.isNotEmpty(etlDelta)) {// 增量导入
                if (params.get(0) != null) {// 参数为第一个且不为null
                    etlConditionSql += etlDelta + ">=?"; // 时间
                } else {
                    params.remove(0);
                }
            }
            if (ListUtils.isNotEmpty(params)) {

                if (StringUtils.isNotEmpty(etlCondition) && StringUtils.isNotEmpty(etlDelta)) {// 增量
                    etlConditionSql += " AND "; // 时间
                }
                if (StringUtils.isNotEmpty(etlCondition)) {// 通用参数
                    etlConditionSql += etlCondition; // 时间
                }
                for (String param : params) {
                    etlConditionSql = etlConditionSql.replace("{}", "?");
                    values.add(param);
                }

                sql += " " + etlConditionSql;
            }
        }
        return sql;
    }

    /**
     * 获取sql结果的条数
     * 
     * @Title getCountBySql
     * @author 吕凯
     * @date 2022年1月13日 下午3:27:50
     * @param sql
     * @param dataSource
     * @param values
     * @return long
     */
    public static long getCountBySql(String sql, DruidDataSource dataSource, List<Object> values) {
        String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
        return (Long) CanalUtil.sqlRS(dataSource, countSql, values, rs -> {
            Long count = null;
            try {
                if (rs.next()) {
                    count = ((Number) rs.getObject(1)).longValue();
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
            return count == null ? 0L : count;
        });
    }

}
